1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package rx.internal.operators;
17
18 import java.util.Queue;
19 import java.util.concurrent.ConcurrentLinkedQueue;
20 import java.util.concurrent.atomic.*;
21
22 import rx.*;
23 import rx.Observable.Operator;
24 import rx.exceptions.*;
25 import rx.functions.Func1;
26 import rx.internal.util.*;
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52 public class OperatorMerge<T> implements Operator<T, Observable<? extends T>> {
53
54 private static final class HolderNoDelay {
55
56 static final OperatorMerge<Object> INSTANCE = new OperatorMerge<Object>(false);
57 }
58
59 private static final class HolderDelayErrors {
60
61 static final OperatorMerge<Object> INSTANCE = new OperatorMerge<Object>(true);
62 }
63
64
65
66
67 @SuppressWarnings("unchecked")
68 public static <T> OperatorMerge<T> instance(boolean delayErrors) {
69 if (delayErrors) {
70 return (OperatorMerge<T>)HolderDelayErrors.INSTANCE;
71 }
72 return (OperatorMerge<T>)HolderNoDelay.INSTANCE;
73 }
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104 private OperatorMerge() {
105 this.delayErrors = false;
106 }
107
108 private OperatorMerge(boolean delayErrors) {
109 this.delayErrors = delayErrors;
110 }
111
112 private final boolean delayErrors;
113
114 @Override
115 public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
116 return new MergeSubscriber<T>(child, delayErrors);
117
118 }
119
120 private static final class MergeSubscriber<T> extends Subscriber<Observable<? extends T>> {
121 final NotificationLite<T> on = NotificationLite.instance();
122 final Subscriber<? super T> actual;
123 private final MergeProducer<T> mergeProducer;
124 private int wip;
125 private boolean completed;
126 private final boolean delayErrors;
127 private ConcurrentLinkedQueue<Throwable> exceptions;
128
129 private volatile SubscriptionIndexedRingBuffer<InnerSubscriber<T>> childrenSubscribers;
130
131 private volatile RxRingBuffer scalarValueQueue = null;
132
133
134 private int missedEmitting = 0;
135 private boolean emitLock = false;
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152 public MergeSubscriber(Subscriber<? super T> actual, boolean delayErrors) {
153 super(actual);
154 this.actual = actual;
155 this.mergeProducer = new MergeProducer<T>(this);
156 this.delayErrors = delayErrors;
157
158 actual.add(this);
159 actual.setProducer(mergeProducer);
160 }
161
162 @Override
163 public void onStart() {
164
165
166 request(RxRingBuffer.SIZE);
167 }
168
169
170
171
172 @Override
173 public void onNext(Observable<? extends T> t) {
174 if (t instanceof ScalarSynchronousObservable) {
175 ScalarSynchronousObservable<? extends T> t2 = (ScalarSynchronousObservable<? extends T>)t;
176 handleScalarSynchronousObservable(t2);
177 } else {
178 if (t == null || isUnsubscribed()) {
179 return;
180 }
181 synchronized (this) {
182
183 wip++;
184 }
185 handleNewSource(t);
186 }
187 }
188
189 private void handleNewSource(Observable<? extends T> t) {
190 if (childrenSubscribers == null) {
191
192 childrenSubscribers = new SubscriptionIndexedRingBuffer<InnerSubscriber<T>>();
193 add(childrenSubscribers);
194 }
195 MergeProducer<T> producerIfNeeded = null;
196
197 if (mergeProducer.requested != Long.MAX_VALUE) {
198
199
200
201
202
203
204
205
206
207
208
209
210
211 producerIfNeeded = mergeProducer;
212 }
213 InnerSubscriber<T> i = new InnerSubscriber<T>(this, producerIfNeeded);
214 i.sindex = childrenSubscribers.add(i);
215 t.unsafeSubscribe(i);
216 if (!isUnsubscribed()) {
217 request(1);
218 }
219 }
220
221 private void handleScalarSynchronousObservable(ScalarSynchronousObservable<? extends T> t) {
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240 if (mergeProducer.requested == Long.MAX_VALUE) {
241 handleScalarSynchronousObservableWithoutRequestLimits(t);
242 } else {
243 handleScalarSynchronousObservableWithRequestLimits(t);
244 }
245 }
246
247 private void handleScalarSynchronousObservableWithoutRequestLimits(ScalarSynchronousObservable<? extends T> t) {
248 T value = t.get();
249 if (getEmitLock()) {
250 boolean moreToDrain;
251 try {
252 actual.onNext(value);
253 } finally {
254 moreToDrain = releaseEmitLock();
255 }
256 if (moreToDrain) {
257 drainQueuesIfNeeded();
258 }
259 request(1);
260 return;
261 } else {
262 try {
263 getOrCreateScalarValueQueue().onNext(value);
264 } catch (MissingBackpressureException e) {
265 onError(e);
266 }
267 return;
268 }
269 }
270
271 private void handleScalarSynchronousObservableWithRequestLimits(ScalarSynchronousObservable<? extends T> t) {
272 if (getEmitLock()) {
273 boolean emitted = false;
274 boolean moreToDrain;
275 boolean isReturn = false;
276 try {
277 long r = mergeProducer.requested;
278 if (r > 0) {
279 emitted = true;
280 actual.onNext(t.get());
281 MergeProducer.REQUESTED.decrementAndGet(mergeProducer);
282
283 isReturn = true;
284 }
285 } finally {
286 moreToDrain = releaseEmitLock();
287 }
288 if (moreToDrain) {
289 drainQueuesIfNeeded();
290 }
291 if (emitted) {
292 request(1);
293 }
294 if (isReturn) {
295 return;
296 }
297 }
298
299
300
301 try {
302 getOrCreateScalarValueQueue().onNext(t.get());
303 } catch (MissingBackpressureException e) {
304 onError(e);
305 }
306 }
307
308 private RxRingBuffer getOrCreateScalarValueQueue() {
309 RxRingBuffer svq = scalarValueQueue;
310 if (svq == null) {
311 svq = RxRingBuffer.getSpscInstance();
312 scalarValueQueue = svq;
313 }
314 return svq;
315 }
316
317 private synchronized boolean releaseEmitLock() {
318 emitLock = false;
319 if (missedEmitting == 0) {
320 return false;
321 } else {
322 return true;
323 }
324 }
325
326 private synchronized boolean getEmitLock() {
327 if (emitLock) {
328 missedEmitting++;
329 return false;
330 } else {
331 emitLock = true;
332 missedEmitting = 0;
333 return true;
334 }
335 }
336
337 private boolean drainQueuesIfNeeded() {
338 while (true) {
339 if (getEmitLock()) {
340 int emitted = 0;
341 boolean moreToDrain;
342 try {
343 emitted = drainScalarValueQueue();
344 drainChildrenQueues();
345 } finally {
346 moreToDrain = releaseEmitLock();
347 }
348
349 if (emitted > 0) {
350 request(emitted);
351 }
352 if (!moreToDrain) {
353 return true;
354 }
355
356 } else {
357 return false;
358 }
359 }
360 }
361
362 int lastDrainedIndex = 0;
363
364
365
366
367 private void drainChildrenQueues() {
368 if (childrenSubscribers != null) {
369 lastDrainedIndex = childrenSubscribers.forEach(DRAIN_ACTION, lastDrainedIndex);
370 }
371 }
372
373
374
375
376 private int drainScalarValueQueue() {
377 RxRingBuffer svq = scalarValueQueue;
378 if (svq != null) {
379 long r = mergeProducer.requested;
380 int emittedWhileDraining = 0;
381 if (r < 0) {
382
383 Object o = null;
384 while ((o = svq.poll()) != null) {
385 on.accept(actual, o);
386 emittedWhileDraining++;
387 }
388 } else if (r > 0) {
389
390 long toEmit = r;
391 for (int i = 0; i < toEmit; i++) {
392 Object o = svq.poll();
393 if (o == null) {
394 break;
395 } else {
396 on.accept(actual, o);
397 emittedWhileDraining++;
398 }
399 }
400
401 MergeProducer.REQUESTED.getAndAdd(mergeProducer, -emittedWhileDraining);
402 }
403 return emittedWhileDraining;
404 }
405 return 0;
406 }
407
408 final Func1<InnerSubscriber<T>, Boolean> DRAIN_ACTION = new Func1<InnerSubscriber<T>, Boolean>() {
409
410 @Override
411 public Boolean call(InnerSubscriber<T> s) {
412 if (s.q != null) {
413 long r = mergeProducer.requested;
414 int emitted = s.drainQueue();
415 if (emitted > 0) {
416 s.requestMore(emitted);
417 }
418 if (emitted == r) {
419
420 return Boolean.FALSE;
421 }
422 }
423 return Boolean.TRUE;
424 }
425
426 };
427
428 @Override
429 public void onError(Throwable e) {
430 if (!completed) {
431 completed = true;
432 innerError(e, true);
433 }
434 }
435
436 private void innerError(Throwable e, boolean parent) {
437 if (delayErrors) {
438 synchronized (this) {
439 if (exceptions == null) {
440 exceptions = new ConcurrentLinkedQueue<Throwable>();
441 }
442 }
443 exceptions.add(e);
444 boolean sendOnComplete = false;
445 synchronized (this) {
446 if (!parent) {
447 wip--;
448 }
449 if ((wip == 0 && completed) || (wip < 0)) {
450 sendOnComplete = true;
451 }
452 }
453 if (sendOnComplete) {
454 drainAndComplete();
455 }
456 } else {
457 actual.onError(e);
458 }
459 }
460
461 @Override
462 public void onCompleted() {
463 boolean c = false;
464 synchronized (this) {
465 completed = true;
466 if (wip == 0) {
467 c = true;
468 }
469 }
470 if (c) {
471
472 drainAndComplete();
473 }
474 }
475
476 void completeInner(InnerSubscriber<T> s) {
477 boolean sendOnComplete = false;
478 synchronized (this) {
479 wip--;
480 if (wip == 0 && completed) {
481 sendOnComplete = true;
482 }
483 }
484 childrenSubscribers.remove(s.sindex);
485 if (sendOnComplete) {
486 drainAndComplete();
487 }
488 }
489
490 private void drainAndComplete() {
491 boolean moreToDrain = true;
492 while (moreToDrain) {
493 synchronized (this) {
494 missedEmitting = 0;
495 }
496 drainScalarValueQueue();
497 drainChildrenQueues();
498 synchronized (this) {
499 moreToDrain = missedEmitting > 0;
500 }
501 }
502 RxRingBuffer svq = scalarValueQueue;
503 if (svq == null || svq.isEmpty()) {
504 if (delayErrors) {
505 Queue<Throwable> es = null;
506 synchronized (this) {
507 es = exceptions;
508 }
509 if (es != null) {
510 if (es.isEmpty()) {
511 actual.onCompleted();
512 } else if (es.size() == 1) {
513 actual.onError(es.poll());
514 } else {
515 actual.onError(new CompositeException(es));
516 }
517 } else {
518 actual.onCompleted();
519 }
520 } else {
521 actual.onCompleted();
522 }
523 }
524 }
525
526 }
527
528 private static final class MergeProducer<T> implements Producer {
529
530 private final MergeSubscriber<T> ms;
531
532 public MergeProducer(MergeSubscriber<T> ms) {
533 this.ms = ms;
534 }
535
536 private volatile long requested = 0;
537 @SuppressWarnings("rawtypes")
538 static final AtomicLongFieldUpdater<MergeProducer> REQUESTED = AtomicLongFieldUpdater.newUpdater(MergeProducer.class, "requested");
539
540 @Override
541 public void request(long n) {
542 if (requested == Long.MAX_VALUE) {
543 return;
544 }
545 if (n == Long.MAX_VALUE) {
546 requested = Long.MAX_VALUE;
547 } else {
548 BackpressureUtils.getAndAddRequest(REQUESTED, this, n);
549 if (ms.drainQueuesIfNeeded()) {
550 boolean sendComplete = false;
551 synchronized (ms) {
552 if (ms.wip == 0 && ms.scalarValueQueue != null && ms.scalarValueQueue.isEmpty()) {
553 sendComplete = true;
554 }
555 }
556 if (sendComplete) {
557 ms.drainAndComplete();
558 }
559 }
560 }
561 }
562
563 }
564
565 private static final class InnerSubscriber<T> extends Subscriber<T> {
566 public int sindex;
567 final MergeSubscriber<T> parentSubscriber;
568 final MergeProducer<T> producer;
569
570 @SuppressWarnings("unused")
571 volatile int terminated;
572 @SuppressWarnings("rawtypes")
573 static final AtomicIntegerFieldUpdater<InnerSubscriber> ONCE_TERMINATED = AtomicIntegerFieldUpdater.newUpdater(InnerSubscriber.class, "terminated");
574
575 private final RxRingBuffer q = RxRingBuffer.getSpscInstance();
576
577 public InnerSubscriber(MergeSubscriber<T> parent, MergeProducer<T> producer) {
578 this.parentSubscriber = parent;
579 this.producer = producer;
580 add(q);
581 request(q.capacity());
582 }
583
584 @Override
585 public void onNext(T t) {
586 emit(t, false);
587 }
588
589 @Override
590 public void onError(Throwable e) {
591
592 if (ONCE_TERMINATED.compareAndSet(this, 0, 1)) {
593 parentSubscriber.innerError(e, false);
594 }
595 }
596
597 @Override
598 public void onCompleted() {
599 if (ONCE_TERMINATED.compareAndSet(this, 0, 1)) {
600 emit(null, true);
601 }
602 }
603
604 public void requestMore(long n) {
605 request(n);
606 }
607
608 private void emit(T t, boolean complete) {
609 boolean drain = false;
610 boolean enqueue = true;
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648 if (parentSubscriber.getEmitLock()) {
649 long emitted = 0;
650 enqueue = false;
651 try {
652
653 emitted += drainQueue();
654
655 if (producer == null) {
656
657 if (complete) {
658 parentSubscriber.completeInner(this);
659 } else {
660 try {
661 parentSubscriber.actual.onNext(t);
662 } catch (Throwable e) {
663
664 onError(OnErrorThrowable.addValueAsLastCause(e, t));
665 }
666 emitted++;
667 }
668 } else {
669
670
671 if (producer.requested > 0 && q.count() == 0) {
672 if (complete) {
673 parentSubscriber.completeInner(this);
674 } else {
675 try {
676 parentSubscriber.actual.onNext(t);
677 } catch (Throwable e) {
678
679 onError(OnErrorThrowable.addValueAsLastCause(e, t));
680 }
681 emitted++;
682 MergeProducer.REQUESTED.decrementAndGet(producer);
683 }
684 } else {
685
686 enqueue = true;
687 }
688 }
689 } finally {
690 drain = parentSubscriber.releaseEmitLock();
691 }
692
693 if(emitted > 0) {
694 request(emitted);
695 }
696 }
697 if (enqueue) {
698 enqueue(t, complete);
699 drain = true;
700 }
701 if (drain) {
702
703
704
705
706
707
708
709
710
711
712 parentSubscriber.drainQueuesIfNeeded();
713 }
714 }
715
716 private void enqueue(T t, boolean complete) {
717 try {
718 if (complete) {
719 q.onCompleted();
720 } else {
721 q.onNext(t);
722 }
723 } catch (MissingBackpressureException e) {
724 onError(e);
725 }
726 }
727
728 private int drainRequested() {
729 int emitted = 0;
730
731 long toEmit = producer.requested;
732 Object o;
733 for (int i = 0; i < toEmit; i++) {
734 o = q.poll();
735 if (o == null) {
736
737 break;
738 } else if (q.isCompleted(o)) {
739 parentSubscriber.completeInner(this);
740 } else {
741 try {
742 if (!q.accept(o, parentSubscriber.actual)) {
743 emitted++;
744 }
745 } catch (Throwable e) {
746
747 onError(OnErrorThrowable.addValueAsLastCause(e, o));
748 }
749 }
750 }
751
752
753 MergeProducer.REQUESTED.getAndAdd(producer, -emitted);
754 return emitted;
755 }
756
757 private int drainAll() {
758 int emitted = 0;
759
760 Object o;
761 while ((o = q.poll()) != null) {
762 if (q.isCompleted(o)) {
763 parentSubscriber.completeInner(this);
764 } else {
765 try {
766 if (!q.accept(o, parentSubscriber.actual)) {
767 emitted++;
768 }
769 } catch (Throwable e) {
770
771 onError(OnErrorThrowable.addValueAsLastCause(e, o));
772 }
773 }
774 }
775 return emitted;
776 }
777
778 private int drainQueue() {
779 if (producer != null) {
780 return drainRequested();
781 } else {
782 return drainAll();
783 }
784 }
785 }
786 }